package com.xiaofan.java;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

/**
 * 查询topic: bin/kafka-topics.sh --list --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181
 * <p>
 * 创建topic： bin/kafka-topics.sh --create --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181 --replication-factor 2 --partitions 3 --topic flink_kafka_source_A0003
 * <p>
 * 删除topic: bin/kafka-topics.sh --delete --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181 --topic flink_kafka_source_A0003
 * <p>
 * 生产数据：bin/kafka-console-producer.sh --broker-list 192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091 --topic flink_kafka_source_A0003
 * <p>
 * 消费数据： bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091 --from-beginning --topic flink_kafka_source_A0003
 *
 * flink on yarn 运行命令： bin/flink run -m yarn-cluster /home/hadoop/fanjh/project/FlinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar
 */

public class StreamingKafkaSource_A0003 {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每隔5000 ms进行启动一个检查点【设置checkpoint的周期】
        env.enableCheckpointing(5000);

        // 高级选项：
        // 设置模式为exactly-once （这是默认值）
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 检查点必须在一分钟内完成，或者被丢弃【checkpoint的超时时间】
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 表示一旦flink处理程序被cancel后，会保留Checkpoint数据，以便根据实际需要恢复到指定的Checkpoint
        // ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后，会保留Checkpoint数据，以便根据实际需要恢复到指定的Checkpoint
        // ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后，会删除Checkpoint数据，只有job执行失败的时候才会保存checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 设置statebackend
        // env.setStateBackend(new RocksDBStateBackend("hdfs://cluster/rocksdb", true));
        // env.setStateBackend(new FsStateBackend("file://" + System.getProperty("user.dir") + "/WorkSpace/checkpoints"));
        // env.setStateBackend(new FsStateBackend("hdfs://node02:8020/com.elephent.flink/checkpoints"));

        String topic = "flink_kafka_source_A0003";
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091");
        properties.setProperty("group.id", "A0003");

        FlinkKafkaConsumer011<String> kafkaSource = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties);
        kafkaSource.setStartFromGroupOffsets();  // 默认消费策略

        DataStreamSource<String> text = env.addSource(kafkaSource);
        text.print().setParallelism(1);

        env.execute("StreamingKafkaSource_A0003");


    }
}
